2012-04-01 21 views
2

シンプルなsshサーバーを使用してssh bruteforceの試行をキャプチャしようとしていますが、コードは荒くて動作しますが、ユーザー名とパスワードの組み合わせを元のIPアドレスと一致させることはできません。twisted.conchでsshログイン試行のIPを追跡する

これはこれまでのところわかるように簡単ですが、ログに記録せずにstdoutに出力します。私はbuildProtocol定義を使って新しい接続のIPアドレスを出力しました。しかし、私は、ユーザー名とパスワードの資格情報と共にIPアドレスを取得したいので、同時に複数のssh bruteforceの試行を追跡することができます。 IPアドレスは接続が確立されるときにのみ取得でき、複数の接続を同時に追跡することは不可能です。

接続や接続について言及するときは、ログインに成功したとは言えませんが、これは自分のコードでは不可能であり、意図されていません。私はあなたが再接続する前に3回のパスワードの試行を可能にするsshサーバーへの単一の接続を指しています。

from zope.interface import implements 
from twisted.conch.unix import UnixSSHRealm 
from twisted.cred import portal 
from twisted.cred.credentials import IUsernamePassword 
from twisted.cred.checkers import ICredentialsChecker 
from twisted.cred.error import UnauthorizedLogin 
from twisted.conch.ssh import factory, userauth, keys, session 
from twisted.internet import reactor, defer 

with open('id_rsa') as privateBlobFile: 
    privateKey = privateBlobFile.read() 

with open('id_rsa.pub') as publicBlobFile: 
    publicKey = publicBlobFile.read() 

class FailDB: 
    credentialInterfaces = IUsernamePassword, implements(ICredentialsChecker) 

    def requestAvatarId(self, credentials): 
     print"%s:%s" % (credentials.username, credentials.password) 
     return defer.fail(UnauthorizedLogin("invalid password")) 

class UnixSSHdFactory(factory.SSHFactory): 
    publicKeys = { 
     'ssh-rsa': keys.Key.fromString(data=publicKey) 
    } 
    privateKeys = { 
     'ssh-rsa': keys.Key.fromString(data=privateKey) 
    } 
    services = { 
     'ssh-userauth': userauth.SSHUserAuthServer 
    } 

    def buildProtocol(self, addr): 
     print addr 
     return factory.SSHFactory.buildProtocol(self, addr) 

if __name__ == '__main__': 
    portal = portal.Portal(UnixSSHRealm()) 
    portal.registerChecker(FailDB()) 
    UnixSSHdFactory.portal = portal 
    reactor.listenTCP(2022, UnixSSHdFactory()) 
    reactor.run() 

出力例は:

IPv4Address(TCP, '127.0.0.1', 42141) 
root:password123 
root:123456 
root:letmein 

答えて

0

あなたはSSHUserAuthServerから派生して_ebBadAuthメソッドをオーバーライドすることにより、失敗したログイン試行からアドレスを取得することができます。例:

class AuthServer(userauth.SSHUserAuthServer): 
    def _ebBadAuth(self, reason): 
     addr = self.transport.getPeer().address.host 
     print("addr {} failed to log in as {} using {}" 
       .format(addr, self.user, self.method)) 
     userauth.SSHUserAuthServer._ebBadAuth(self, reason) 

あなたはまた、接続が成功することを可能にしたい場合は、あなたはSSHFactory.servicesを上書きするが、唯一のあなたの派生クラスでそれを更新し、factory.services.update({'ssh-userauth': AuthServer})好きではないしなければならないでしょう。

関連する問題